热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

技术|使用FlinkSQL传输市场数据1:传输VWAP

FlinkSQL是一种数据处理语言,用于事件驱动和流应用程序的快速原型设计和开发。他将SQL的简单性和可访问性与Flink的性能和可伸缩性结合在一起。借助FlinkSQL,大家都可

点击Cloudera中国 即可订阅!

本文是一个由多部分组成的系列文章的第一篇,展示了 FlinkSQL 应用于市场数据的强大功能和可表达性。该系列的代码和数据可在 github 上获得。它由量化建模负责人 Simudyne 和 Krishnen Vytelingum 合着。

速度在金融市场上至关重要。无论目标是最大化 alpha 还是最大程度地减少风险,金融技术人员都会投入大量资金,以获取有关市场状况以及行情的最新见解。事件驱动和流式处理体系结构可在事件发生时对事件进行复杂的处理,使其很自然地适合金融市场应用。


Flink SQL 是一种数据处理语言,可用于事件驱动和流应用程序的快速原型设计和开发。Flink SQL 将 SQL 的简单性和可访问性与 Apache Flink(一种流行的分布式流媒体平台)的性能和可伸缩性结合在一起。借助 Flink SQL,业务分析人员、开发人员和量化人员都可以快速建立流传输管道,以实时执行复杂的数据分析。


在本文中,我们将使用 Simudyne 开发的基于代理的模型(ABM)生成的综合市场数据。ABM 并不是自上而下的方法,而是在复杂系统中对自主参与者(或代理)进行建模,例如:金融市场中的各种买卖双方。可以捕获这些交互,并可以针对许多应用程序分析生成的综合数据集,例如用于检测紧急欺诈行为的训练模型,或探索风险管理的“假设”场景。ABM 生成的综合数据在历史数据不足或不可用的情况下很有用。


流式 VWAP

我们从一个简单的示例开始,该示例从一系列交易事件中计算成交量加权平均价格(VWAP)。VWAP 是交易中用来衡量证券的市场价格和未来方向的通用基准。在这里,我们有一个 CSV 格式的数据集,该数据集显示了一个交易日(2020年10月22日)的虚构证券(SIMUI)的交易事件。 

    sym,prc,vol,bid_id,ask_id,buyer_id,seller_id,step,time
    SIMUl,149.86,2300,P|63-m-1,P|66-l-0,P|63,P|66,380,22-Oct-2020 08:00:07.600
    SIMUl,149.86,1935,P|63-m-1,P|25-l-0,P|63,P|25,380,22-Oct-2020 08:00:07.600
    SIMUl,149.74,582,P|18-l-0,P|98-m-0,P|18,P|98,428,22-Oct-2020 08:00:08.560
    SIMUl,149.76,2475,P|27-l-0,P|42-m-1,P|27,P|42,1021,22-Oct-2020 08:00:20.420
    SIMUl,149.84,21,P|5-m-0,P|42-l-0,P|5,P|42,1078,22-Oct-2020 08:00:21.560
    SIMUl,149.76,2709,P|24-l-1,P|92-m-0,P|24,P|92,1200,22-Oct-2020 08:00:24.000
    SIMUl,149.84,1653,P|8-m-1,P|24-l-0,P|8,P|24,1513,22-Oct-2020 08:00:30.260
    SIMUl,149.84,400,P|19-m-0,P|24-l-0,P|19,P|24,1577,22-Oct-2020 08:00:31.540

    这些列是:交易品种,价格,数量,出价 ID,要价 ID,买方 ID,卖方 ID,步骤和时间戳。步骤列是离散步骤 ABM 市场模拟的伪像,出于我们的目的可以忽略;其余各栏不言自明。


    要处理此数据,我们需要通过发出 CREATE TABLE 语句来声明 Flink SQL 表。我们的示例数据是基于文件系统的,但是可以轻松更改连接器类型以从其他来源(例如Kafka主题)读取数据。请注意,event_time 是派生的列,也用于水印。通过加水印,Flink 可以限制等待延迟到达和故障事件的时间,以便可以取得进展。在这里,我们声明,到达 event_time 超过水印一分钟以上的记录将被忽略。

      CREATE TABLE trades (
      symbol STRING,
      price DOUBLE,
      vol INT,
      bid_id STRING,
      ask_id STRING,
      buyer_id STRING,
      seller_id STRING,
      step INT,
      ts_str STRING,
      event_time AS TO_TIMESTAMP (ts_str, 'dd-MMM-yyyy HH:mm:ss.SSS'),
      WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE
      ) WITH (
      'connector' = 'filesystem',
      'path' = '/path/to/varstream/data/trades_raw',
      'format' = 'csv'
      );

      VWAP 的公式很简单:对于指定时间段内的每笔交易,将价格乘以交易股份数即可。将其总和除以该时间段内已交易的股票总数。下面的流查询将显示当前的 VWAP,它将随着新交易事件的到来而更新:

        SELECT
        symbol,
        SUM (vol)                     AS cumulative_volume,
        SUM (price * vol)             AS cumulative_pv,
        SUM (price * vol) / SUM (vol) AS vwap
        FROM
        trades
        GROUP BY
        symbol
        ;


        实时播放

        由于 CSV 文件中一个符号中只有一天的数据价值,因此结果更新可能发生得太快了,您几乎没有注意到。从源读取事件的速度比实时发生的速度要快。有时需要在准实时回放历史数据,就好像 Flink 现在正在接收历史事件数据(例如,用于演示或原型设计和开发过程中)。


        为了解决这个问题,我们提供了一个简单的 UDTF(用户定义的表函数),该数据以从行时间戳派生的人工延迟播放历史数据。UDTF 有两个参数:第二个参数指定行时间戳(在我们的示例中为 event_time ),而第一个参数指定第一个行时间戳之后的分钟持续时间(以分钟为单位),以开始应用延迟。以下代码段显示了如何注册 UDTF 并在处理事件的前120分钟后将其用于视图中以应用延迟。请注意 LATERAL TABLE 联接的使用,该联接将函数应用于主表中的每一行。

          -- Register UDTF
          CREATE FUNCTION replay_after AS 'varstream.ReplayAfterFunction' LANGUAGE JAVA ;
          -- Create a view
          CREATE VIEW trades_replay AS (
          SELECT * FROM trades
          LEFT JOIN LATERAL TABLE (replay_after (120, trades.event_time)) ON TRUE
          ) ;

          您可以通过发出一个简单的查询来验证事件的重播方式:

          SELECT * FROM trades_replay

          使用此视图,我们现在可以发出相同的 VWAP 聚合查询,并观察对 VWAP 的流更新,就好像它们是实时发生的一样:

            SELECT
            symbol,
            SUM (vol) AS cumulative_volume,
            SUM (price * vol) AS cumulative_pv,
            SUM (price * vol) SUM (vol) AS vwap
            FROM
            trades_replay
            GROUP BY
            symbol
            ;

            尽管此 UDTF 在进行原型制作时非常有用,但从根本上没有打算把它用于生产用途。我们在这里使用它只是为了演示 FlinkSQL 如何在事件以模拟实时到达时更新聚合结果。


            Group Windows

            前面的示例显示了如何计算当天的流式 VWAP。假设您要以每隔1分钟的时间建立一个带有蜡烛图的交易仪表板。您可能需要计算每分钟的 VWAP、高价、低价和总体积。Flink SQL 通过组窗口使此操作变得容易,组窗口可以在 GROUP BY 时间间隔上应用聚合函数。


            下面显示了如何获取每分钟的 VWAP:

              CREATE VIEW vwap_1m AS (
              SELECT
              symbol,
              TUMBLE_START (event_time, INTERVAL '1' MINUTES) AS start_time,
              TUMBLE_ROWTIME (event_time, INTERVAL '1' MINUTES) AS row_time,
              MAX (price) AS max_price,
              MIN (price) AS min_price,
              SUM (price * vol) AS total_price,
              SUM (vol) AS total_vol,
              SUM (price * vol) SUM (vol) AS vwap
              FROM
              trades
              GROUP BY
              TUMBLE (event_time, INTERVAL '1' MINUTES), symbol
              );
              SELECT symbol, start_time, total_price, total_vol, vwap FROM vwap_1m ;

              前面的操作为每分钟内发生的交易计算了 VWAP。如果要在几分钟内计算移动的 VWAP(MVWAP),则 Flink SQL 提供了一个跳跃的组窗口。下面显示了5分钟的移动 VWAP,步长为1分钟。

                CREATE VIEW vwap_5m AS (
                SELECT
                symbol,
                HOP_START (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES) AS start_time,
                HOP_ROWTIME (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES) AS row_time,
                MAX (price) AS max_price,
                MIN (price) AS min_price,
                SUM (price * vol) AS total_price,
                SUM (vol) AS total_vol,
                SUM (price * vol) / SUM (vol) AS vwap
                FROM
                trades
                GROUP BY
                HOP (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES), symbol
                );
                SELECT symbol, start_time, total_price, total_vol, vwap FROM vwap_5m ;

                结论

                Flink SQL 可以极大地简化和加快流数据流的开发。在本文中,我们探索了 SQL GROUP BY 子句的不同用法,以根据市场数据流计算 VWAP 的变化。在下一部分中,我们将向您展示如何从市场数据中提取每分钟的流式采样,以计算日内风险价值(IVaR)。我们希望本系列文章能鼓励您尝试将 Flink SQL 用于流式市场数据应用程序。

                原文作者

                Patrick Angeles& Krishnen Vytelingum

                原文链接

                https://blog.cloudera.com/streaming-market-data-with-flink-sql-part-i-streaming-vwap/


                Cloudera中国

                更多资讯,点击阅读原文

                长按扫码关注我们




                推荐阅读
                • 本文详细介绍了MySQL数据库的基础语法与核心操作,涵盖从基础概念到具体应用的多个方面。首先,文章从基础知识入手,逐步深入到创建和修改数据表的操作。接着,详细讲解了如何进行数据的插入、更新与删除。在查询部分,不仅介绍了DISTINCT和LIMIT的使用方法,还探讨了排序、过滤和通配符的应用。此外,文章还涵盖了计算字段以及多种函数的使用,包括文本处理、日期和时间处理及数值处理等。通过这些内容,读者可以全面掌握MySQL数据库的核心操作技巧。 ... [详细]
                • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
                • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
                • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
                • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
                • 本文介绍如何在将数据库从服务器复制到本地时,处理因外键约束导致的数据插入失败问题。 ... [详细]
                • importpymysql#一、直接连接mysql数据库'''coonpymysql.connect(host'192.168.*.*',u ... [详细]
                • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
                • 从0到1搭建大数据平台
                  从0到1搭建大数据平台 ... [详细]
                • 如何在Java中使用DButils类
                  这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
                • 本文总结了在SQL Server数据库中编写和优化存储过程的经验和技巧,旨在帮助数据库开发人员提升存储过程的性能和可维护性。 ... [详细]
                • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
                • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
                  在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
                • 在处理数据库中所有用户表的彻底清除时,目前尚未发现单一命令能够实现这一目标。因此,需要采用一种较为繁琐的方法来逐个删除相关表及其结构。具体操作可以通过编写PL/SQL脚本来实现,该脚本将动态生成并执行删除表的SQL语句。尽管这种方法相对复杂,但在缺乏更简便手段的情况下,仍是一种有效的解决方案。未来或许可以通过数据库管理工具或更高版本的数据库系统提供更简洁的处理方式。 ... [详细]
                • PTArchiver工作原理详解与应用分析
                  PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
                author-avatar
                HikariFocus_695
                这个家伙很懒,什么也没留下!
                PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
                Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有